配對交易 Pair Trading

如何操作

如果兩個相關性較高的股票或者其他證券之間出現背離,就應該買進表現相對較差的,賣出表現相對較好的。當未來兩者之間的背離得到糾正,那麼可以進行相反的平倉操作來獲取利潤。其本質上是一個反轉投資策略。

配對交易缺點

雖然該策略沒有太大的下行風險,但機會匱乏,而且,為了獲利,交易者必須是最先利用機會的人之一。其他一些風險包括:
• 市場風險的衡量標準,例如beta,是歷史性的,未來可能會比過去更加不同。
• 如果實施均值回歸策略,那麼假設未來的均值將與過去一樣保持不變。

配對交易步驟

Step 1:抓資料(2014年~2018年)及清理

# data = read.csv("C:/Users/amyhs/Desktop/碩士課程/時間序列/報告/期末報告/塑化類股.csv" , header = FALSE , sep = ",")
# codes = unlist(as.character(data[,1]))
# 
# start = "2014-01-01"
# end = "2018-12-31"
# 
# df.stock = getSymbols(codes[1] , src = 'yahoo' , auto.assign = FALSE , from = start , to = end)
# df.stock = Ad(df.stock)
# 
# 
# for (code in codes[2 : length(codes)]){
#   stocks = getSymbols(code , src = 'yahoo' , auto.assign = FALSE , from = start , to = end)
#   df.stock = cbind(df.stock , Ad(stocks))
# }
# 
# colnames(df.stock) = codes
# 
# save(df.stock , file = 'C:/Users/amyhs/Desktop/碩士課程/時間序列/報告/期末報告/塑化類股.RData')

load('C:/Users/amyhs/Desktop/碩士課程/時間序列/報告/期末報告/塑化類股.RData')

df = na.omit(df.stock)
# 將欄位名稱加x
colnames(df) = sapply(1:length(colnames(df)) , function(x)
  { colnames(df)[1] = str_c('x', str_sub(colnames(df[,x]),1,4)) })

# 取2014~2017
df_history = df['2014/2017',]

Step 2:對股價做單根檢定

找出股價不穩定的個股 1. 此次adf檢定的套件選用tseries,該套件的adf檢定默認落後期數為trunc((length(x)-1)^(1/3)),trunc代表整數部分。 2. 此次報告所選2014-2017的股價數據共有979筆,因此默認落後期數為9.而fUnitRoots套件中的adf檢定落後期數默認為1. 3. 考慮到落後期數不同,檢定結果會有較大差異,因此我們每次做檢定都分別考慮(lag=1 3 5 7 9)五種情況,選取最小pvalue。

alpha = 0.05
# 取股價adf test p-value>0.05

adf = function(x){
  pvalue = vector("numeric",5)
  for (i in seq(1,9,2)){
    temp = adf.test(x,k=i)[['p.value']]
    pvalue[(i+1)/2] = temp
  }
  return(min(pvalue))
}


df_history = df_history[ , sapply(df_history, function(x) { adf(x) >= alpha })]

num = length(colnames(df_history))

Step 3:兩兩建迴歸取出殘差

k = 1
for (i in seq(num)){
  for (j in seq(num)){
    risidual = lm(df_history[,i]~df_history[,j])[["residuals"]]
    df_history = merge(df_history, risidual)
    colnames(df_history)[num+k] = str_c(colnames(df_history[,i]),colnames(df_history[,j]))
    k = k+1
  }
}

# 把自己和自己的殘差刪除
for (i in seq(num)){
  df_history[,((num+1)*i)] = NA
}

# 去除全部為NA的欄
df_history = df_history[, colSums(is.na(df_history)) != nrow(df_history)]

Step 4:將殘差做單根檢定

找出最穩定的殘差

# 計算RESIDUAL P-VALUE
risidual_p = sapply((num+1):length(colnames(df_history)),
                    function(x){ adf(df_history[,x]) })
# 找P-VALUE最小的
risidual_min = df_history[ , which.min(risidual_p)+num]

name1 = str_sub(colnames(risidual_min),1,5)
name2 = str_sub(colnames(risidual_min),6,10)

name1
## [1] "x1301"
name2
## [1] "x1304"

取出歷史資料

stock1_history = df_history[,name1]
stock2_history = df_history[,name2]

殘差圖形

plot(risidual_min , main = 'residual')

兩股票趨勢(2014~2017)

2014~2017

database1 = as.data.frame(df_history)

p = plot_ly(data = database1 , x = index(df_history)) %>%
  
  add_lines(y = database1[,name1] , type = "scatter" , mode = "lines" ,
            line = list(color = 'gray') , name = sprintf('%s.TW',name1)) %>%
  
  add_lines(y = database1[,name2],
            type = "scatter" , mode = "lines",
            line = list(color = '#28B0FE') ,
            yaxis = "y2", name = sprintf('%s.TW',name2)) %>%
  
  layout(title = sprintf('%s.TW vs %s.TW' , name1 , name2),
         xaxis = list(
           rangeselector = list(
             buttons = list(
               list(
                 count = 3,
                 label = "3m",
                 step = "month",
                 stepmode = "backward"),
               list(
                 count = 6,
                 label = "6m",
                 step = "month",
                 stepmode = "backward"),
               list(
                 count = 1,
                 label = "1y",
                 step = "year",
                 stepmode = "backward"),
               list(
                 count = 3,
                 label = "3y",
                 step = "year",
                 stepmode = "backward"),
               list(
                 count = 5,
                 label = "5y",
                 step = "year",
                 stepmode = "backward"),
               list(step = "all"))),
           
           rangeslider = list(type = "date")),
         
         yaxis = list(side = 'left' ,
                      title = sprintf('%s.TW',name1)) ,
         
         yaxis2 = list(title = sprintf('%s.TW',name2) ,
                       overlaying = "y", side = "right"))

offline( p )

取出2018資料

df_2018 = df['2018',]

stock1 = df_2018[,name1]
stock2 = df_2018[,name2]

2018

df_2018 = df['2018',]

database2 = as.data.frame(df_2018)

p_2018 = plot_ly(data = database2 , x = index(df_2018)) %>%
  
  add_lines(y = database2[,name1] , type = "scatter" , mode = "lines" ,
            line = list(color = 'gray') , name = sprintf('%s.TW',name1)) %>%
  
  add_lines(y = database2[,name2],
            type = "scatter" , mode = "lines",
            line = list(color = '#28B0FE') ,
            yaxis = "y2", name = sprintf('%s.TW',name2)) %>%
  
  layout(title = sprintf('%s.TW vs %s.TW' , name1 , name2),
         xaxis = list(
           rangeselector = list(
             buttons = list(
               list(
                 count = 3,
                 label = "3m",
                 step = "month",
                 stepmode = "backward"),
               list(
                 count = 6,
                 label = "6m",
                 step = "month",
                 stepmode = "backward"),
               list(
                 count = 1,
                 label = "1y",
                 step = "year",
                 stepmode = "backward"),
               list(
                 count = 3,
                 label = "3y",
                 step = "year",
                 stepmode = "backward"),
               list(
                 count = 5,
                 label = "5y",
                 step = "year",
                 stepmode = "backward"),
               list(step = "all"))),
           
           rangeslider = list(type = "date")),
         
         yaxis = list(side = 'left' ,
                      title = sprintf('%s.TW',name1)) ,
         
         yaxis2 = list(title = sprintf('%s.TW',name2) ,
                       overlaying = "y", side = "right"))

offline( p_2018 )

交易策略

利用迴歸殘差最穩定的兩標的做配對交易

計算歷史截距項及beta

intercept = lm(stock1_history ~ stock2_history)[['coefficients']][[1]]
beta = lm(stock1_history ~ stock2_history)[['coefficients']][[2]]
beta_2018 = lm(stock1 ~ stock2)[['coefficients']][[2]]

畫出歷史價差圖形

spread = stock1-stock2*beta-intercept
plot(spread, main='spread')

畫出2018價差圖形

spread_history = stock1_history - stock2_history*beta - intercept
plot(spread_history, main='spread')

將價差標準化

zscore = (spread-mean(spread_history))/sd(spread_history)
colnames(zscore) = 'zscore'
plot(zscore, main='zscore')

建立投資策略

z-score 交易訊號
> 1.28 放空
-0.25~0.25 平倉
< 1.28 買進
portfolio = as.data.frame(cbind(zscore, stock1, stock2))

portfolio$return1 = ROC(portfolio[,name1], type = "continuous")
portfolio$return2 = ROC(portfolio[,name2], type = "continuous")

portfolio$position = NA
portfolio[1,"position"] = 0
portfolio$return = NA
# 大於1.28漲太多=>賣出
for (i in  seq(length(zscore))){
  if (portfolio[i,"zscore"] > 1.28){
    portfolio[i,"position"] = -1
  }
  else if (portfolio[i,1]< -1.28){
    portfolio[i,"position"] = 1
  }
  else if (abs(portfolio[i,1])< 0.25){
    portfolio[i,"position"] = 0
  }
}


head(portfolio)
##              zscore    x1301    x1304      return1      return2 position
## 2018-01-02 1.959546 93.10741 15.82419           NA           NA       -1
## 2018-01-03 2.042126 93.48705 15.63236  0.004069140 -0.012196532       -1
## 2018-01-04 2.021458 93.48705 15.72822  0.000000000  0.006113745       -1
## 2018-01-05 2.000797 93.39214 15.77625 -0.001015736  0.003049156       -1
## 2018-01-08 2.103826 94.43617 15.82419  0.011116884  0.003033631       -1
## 2018-01-09 2.402664 97.28348 15.87212  0.029705138  0.003024519       -1
##            return
## 2018-01-02     NA
## 2018-01-03     NA
## 2018-01-04     NA
## 2018-01-05     NA
## 2018-01-08     NA
## 2018-01-09     NA

fill函數將持有部位放入

portfolio = portfolio %>% fill(position,.direction = "down")

計算報酬

portfolio$return = (portfolio$return1-beta*portfolio$return2)*portfolio$position

交易結果

portfolio = na.omit(portfolio)

#計算累計報酬
portfolio$cumsum_re = NA
portfolio$cumsum_re = sapply(1:length(portfolio$cumsum_re), 
                             function(x){ portfolio[x,"cumsum_re"]=exp(sum(portfolio[1:x,"return"])) })

plot(portfolio$cumsum_re, type='l', main='cumsum return')